半夜的蘋果發表會,想起了賈伯斯的那句“Stay Hungry. Stay Foolish”
“Youth is happy because it has the capacity to see beauty. Anyone who keeps the ability to see beauty never grows old.”
― Franz Kafka
轉眼也快奔三了啊
這邊會先從最小的一筆訊息可以介紹起,Kafka 中主題的每一筆訊息都是包含索引鍵值 ( key-value ) 和時間戳 ( timstamp )。
鍵 ( key ) 可以不用設定、不設定的情況下預設會自動將訊息平均發配所有分區上,如果有設定可以用來當作寫入哪個分區的依據,一般最常見的做法是將雜湊值 ( hash ) 除以分區的數量得出餘數,將訊息分配給相對應編號的分區。
時間戳 ( timestamp )
log
檔案切分值 ( value ) 的部分,通常是以 JSON (Javascript Object Notation) 的格式儲存,通常建議可以統一加上一些標示或描述讓訊息可讀性更高、更容易理解。
訊息可以一筆筆的傳送,但是在大數量的情境下會消耗掉很大量的網路傳輸成本,因此 Kafka 是批次寫入的,但是批次寫入一定會造成寫入的延遲性,這必須視情況下去考量,看使用場景是 I/O 重要、還是低延遲比較重要。
用Kafka
內建的kafka-run-class kafka.tools.DumpLogSegments
指令可以查看log
檔案內容,—file
為必帶、用逗號隔開可查詢多個檔案,指令是精華後面會再詳細介紹。
這邊主要是先讓大家看一下,訊息的 metadata
有哪些,一般需要知道就是上面提到的索引鍵、值、時間戳和偏移量。
可以看到 Kafka
是會先訊息累積到 batch
中,等累積達一定量再送出,先以這一筆來看,就是累積了三筆 ( count: 3 ) 一次送出。
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
以下是主題 topic_for_test_lot
三個分區的 log
檔案內容
但是這邊看不到訊息實際的內容,只有一個個包起來的批次
$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log,broker1/topic_for_test_log-1/00000000000000000000.log,broker2/topic_for_test_log-2/00000000000000000000.log
Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
baseOffset: 5 lastOffset: 10 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 179 CreateTime: 1631771625952 size: 128 magic: 2 compresscodec: NONE crc: 3083079563 isvalid: true
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
Dumping broker1/topic_for_test_log-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771626645 size: 95 magic: 2 compresscodec: NONE crc: 1577038537 isvalid: true
baseOffset: 3 lastOffset: 9 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 95 CreateTime: 1631771633592 size: 135 magic: 2 compresscodec: NONE crc: 372605860 isvalid: true
baseOffset: 10 lastOffset: 15 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 230 CreateTime: 1631771635665 size: 122 magic: 2 compresscodec: NONE crc: 2359930090 isvalid: true
Dumping broker2/topic_for_test_log-2/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 5 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771620918 size: 124 magic: 2 compresscodec: NONE crc: 2581760372 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 124 CreateTime: 1631771627429 size: 69 magic: 2 compresscodec: NONE crc: 3572829527 isvalid: true
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 193 CreateTime: 1631771635834 size: 69 magic: 2 compresscodec: NONE crc: 1038836103 isvalid: true
如果想要看到每個 batch
中的每一筆訊息內容(payload
欄位),還需要加上參數--print-data-log
,這樣就可以看到真實的記錄了
$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log --print-data-log
Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 7 CreateTime: 1631771618877 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf as
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771619471 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: sdf
| offset: 2 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771619770 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
| offset: 3 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771621106 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: as
| offset: 4 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771621294 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: dfa
...
...
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
| offset: 11 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771633791 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: asdf
...
...
| offset: 15 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771634475 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: fa
| offset: 16 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771634662 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: dsf
紀錄是看到了,那到底儲存在最深處的元資料代表了什麼,以下一個一個參數為大家初步介紹,後續篇章都會有近一步實際範例去使用、影響這些參數,目前只需要有個印象就足夠了。
baseOffset: 0
: 該批次的起始偏移量lastOffset: 2
: 該批次結束的偏移量count: 3
: 該批次的紀錄有幾筆baseSequence: -1
: 需要設定消費者設定 idempotent
參數為真才會生效,否則為 -1lastSequence: -1
: 需要設定消費者設定 idempotent
參數為真才會生效,否則為 -1producerId: -1
: 生產者的編號,這邊因為使用內建的shell
去新增資料所以為 -1producerEpoch: -1
: 生產者的 Epoch 編號,這邊因為使用內建的shell
去新增資料所以為 -1partitionLeaderEpoch: 0
: 該筆資料的 partition leader
是誰isTransactional: false
: 是否為事務isControl: false
control batch
會有一筆 control record
是用來讓消費者判斷這個批次是否為失敗的事務CreateTime: 1631771619770
: Partition Leader
收到訊息的時間點size: 98
: 該批次的大小magic: 2
=> 代表訊息的版本是 V2,Apache Kafka 在版本0.8之前支援 old consumer
和 old producer
也就是 V1版本,而現行版本都是使用 V2。compresscodec: NONE
NONE
,建議是都要開啟crc: 16374966
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
CRC
驗證的範圍是從attributes
一直到這個batch
的尾端,CRC 位置是在 magic
之後,這是因為必須先解析magic
類型才能決定要怎麼解析batch length
到 magic
之間的 byte
,而 partition leader epoch
也不包含在 CRC
驗證的區間,這是為了避免這個欄位值在被 broker
重新分配時 CRC
必須要重新計算,這裡 CRC
是使用 CRC-32C
來計算。
isvalid: true
這個參數是 Kafka
2.4.0版本新增的,主要是為了更清楚地顯示錯誤的發生原因,像是版本 ( magic
) 對應錯誤、CRC
的 checksum
錯誤...等,會藉由 future
物件 RecordMetadata
將詳細的錯誤原因回傳。
資料來源: